package com.atguigu.realtime.apps

import java.time.LocalDate

import com.alibaba.fastjson.JSON
import com.atguigu.common.constants.TopicConstant
import com.atguigu.common.utils.PropertiesUtil
import com.atguigu.realtime.beans.{ActionLog, CouponAlertInfo}
import com.atguigu.realtime.utils.{DStreamUtil, DateHandleUtil}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges, OffsetRange}
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}

import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.util.control.Breaks

/**
 * Created by Smexy on 2022/6/29
 *
 *    在SparkStreaming中有三个和时间相关的字段:
 *          ①batchduration:   每个批次采集的时间范围。 每个批次采集时间范围内采集的数据会封装为一个RDD
 *          ②window       :   要计算的数据横跨的时间范围。 会将这个时间范围内涉及的所有的批次的RDD合并(union)为
 *                            一个RDD
 *          ③slide        ：   每间隔多久计算一次
 *
 *          要求：  window和slide必须是 batchduration的整倍数。
 *                默认情况，不指定上述二者，和batchduration 一致。
 *
 *
 *     -------------------------------
 *       val context new StreamingContext(xxx,batchduration):
 *            这个context，没间隔 batchduration 提交一次Job，提交的Job计算 batchduration
 *                  时间范围内，产生的以 batchduration时间作为一个批次的所有批次的数据
 *
 *      -----------------------------
 *          batchduration = 10s
 *          window = 30s
 *          slide = 20s
 *
 *     --------------------------
 *        使用幂等输出实现
 *
 */
object AlertApp extends BaseApp {
  override var appName: String = "AlertApp"
  override var groupId: String = "realtime220212"
  override var topic: String = TopicConstant.ACTION_LOG
  override var batchDuration: Int = 30

  def main(args: Array[String]): Unit = {

    //写入ES，需要在Configuration中添加ES集群的一些参数
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName(appName)

    sparkConf.set("es.nodes",PropertiesUtil.getProperty("es.nodes"))
    sparkConf.set("es.port",PropertiesUtil.getProperty("es.port"))
    sparkConf.set("es.index.auto.create", "true")
    //允许使用主机名(域名)访问es节点
    sparkConf.set("es.nodes.wan.only", "true")

    context = new StreamingContext(sparkConf,Seconds(batchDuration))

    runApp{

      val ds: InputDStream[ConsumerRecord[String, String]] = DStreamUtil.createDS(context, groupId, topic)

      //Driver端声明
      var ranges: Array[OffsetRange] = null
      //先获取偏移量
      val ds1: DStream[ActionLog] = ds.transform(rdd => {

        //本身就在Driver端获取
        ranges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

        rdd.map(record => JSON.parseObject(record.value(), classOf[ActionLog]))

      })

      //开窗5分钟
      val ds2: DStream[(String, Iterable[Iterable[ActionLog]])] = ds1.window(Minutes(5))
        .map(log => ((log.mid, log.uid), log))
        .groupByKey()
        .filter {
          case ((mid, uid), logs) => {
            //只留下修改了收货地址的 用户及其logs
            var flag = false

            Breaks.breakable {

              logs.foreach(log => {

                if ("trade_add_address".equals(log.action_id)) {
                  flag = true
                  //跳出循环
                  Breaks.break()
                }
              })

            }

            flag
          }
        }
        .map {
          case ((mid, uid), logs) => (mid, logs)
        }.groupByKey()

      //判断这个设备上，增加收货地址的用户个数是否超过2个，超过就留下
      val ds3: DStream[(String, Iterable[ActionLog])] = ds2.filter(_._2.size >= 2)
        .mapValues(_.flatten)


      //产生预警日志
      val ds4: DStream[CouponAlertInfo] = ds3.map {
        case (mid, logs) => {

          val uids: mutable.Set[String] = new mutable.HashSet[String]
          val itemIds: mutable.Set[String] = new mutable.HashSet[String]
          val events: ListBuffer[String] =  new ListBuffer[String]
          //封装预警日志
          logs.foreach(log => {

            uids.add(log.uid)
            events.append(log.action_id)
            // eq比较对象内存中的地址值
            // equals比较字符串的内容是否相等
            if ("favor_add".equals(log.action_id)) {
              //记录收藏的商品的id
              itemIds.add(log.item)
            }


          })

          /*
              封装ts和id(体现mid)

              同一设备，如果一分钟产生多条预警，只保留最后一条预警日志

              在ES中覆盖的前提示两条数据的ID一样

              mid101_2022-06-29 10:10:10, log1
                  mid101_2022-06-29 10:10,log1
              mid101_2022-06-29 10:10:30, log2
                  mid101_2022-06-29 10:10,log2

              id: mid_ts的分钟部分

           */
          val ts: Long = System.currentTimeMillis()

          val dateMinite: String = DateHandleUtil.parseMillTsToDateMiniteTime(ts)

          CouponAlertInfo(mid + "_" + dateMinite, uids, itemIds, events, ts)


        }
      }

      //只要导入了elasticsearch-hadoop，可以使用ES官方提供的方法

      import  org.elasticsearch.spark._
      //写入ES
      ds4.foreachRDD(rdd => {

        rdd.cache()

        println("即将写出:"+rdd.count())

        /*
            resource： 要写入的index。
                      每一天的预警日志，存在当天的index中。
            cfg:  写入的配置
                      必须要指定的是：  es.mapping.id ->   RDD中样例类的哪个字段作为 ES 目标Index中的_id


             数据漂移问题：
                          当前消费的这批数据是 2022-06-29:23:59:59产生的，在计算到 181行时，已经是  2022-06-30，
                          这条预警，会保存在  realtime2022_behaviour_alert2022-06-30中，
                          应该保存在  realtime2022_behaviour_alert2022-06-29中，
                          怎么办？
                                不用办！ 在ES中，可以通过解析每条日志的ts，自动获取日志所产生的日期。


         */
        rdd.saveToEs("realtime2022_behaviour_alert"+LocalDate.now(),Map("es.mapping.id" -> "id"))

        //提交偏移量
        ds.asInstanceOf[CanCommitOffsets].commitAsync(ranges)


      })

    }
  }
}
